package double_stream_join;

import java.io.IOException;
import java.util.stream.Stream;

public class SendMessageApplication {
    public static void main(String[] args) throws IOException {
        // 文件地址
        String filePath = "src/main/resources/UserBehavior.csv";
        // kafka topic
        String topic = "userbehavior";
        // kafka borker地址
        String broker = "hadoop102:9092";

        // 从csv中读取数据，并通过kafka生产者发送出去
        Stream.generate(new UserBehaviorCsvFileReader(filePath))
                .sequential()
                .forEachOrdered(new KafkaProducer(topic, broker));
    }
}
